1   /*
2    * Copyright (C) 2009 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import static com.google.common.base.Preconditions.checkNotNull;
20  import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
21  
22  import com.google.common.annotations.Beta;
23  
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.Future;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  /**
31   * Utilities necessary for working with libraries that supply plain {@link
32   * Future} instances. Note that, whenver possible, it is strongly preferred to
33   * modify those libraries to return {@code ListenableFuture} directly.
34   *
35   * @author Sven Mawson
36   * @since 10.0 (replacing {@code Futures.makeListenable}, which
37   *     existed in 1.0)
38   */
39  @Beta
40  public final class JdkFutureAdapters {
41    /**
42     * Assigns a thread to the given {@link Future} to provide {@link
43     * ListenableFuture} functionality.
44     *
45     * <p><b>Warning:</b> If the input future does not already implement {@code
46     * ListenableFuture}, the returned future will emulate {@link
47     * ListenableFuture#addListener} by taking a thread from an internal,
48     * unbounded pool at the first call to {@code addListener} and holding it
49     * until the future is {@linkplain Future#isDone() done}.
50     *
51     * <p>Prefer to create {@code ListenableFuture} instances with {@link
52     * SettableFuture}, {@link MoreExecutors#listeningDecorator(
53     * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
54     * {@link AbstractFuture}, and other utilities over creating plain {@code
55     * Future} instances to be upgraded to {@code ListenableFuture} after the
56     * fact.
57     */
58    public static <V> ListenableFuture<V> listenInPoolThread(
59        Future<V> future) {
60      if (future instanceof ListenableFuture) {
61        return (ListenableFuture<V>) future;
62      }
63      return new ListenableFutureAdapter<V>(future);
64    }
65  
66    /**
67     * Submits a blocking task for the given {@link Future} to provide {@link
68     * ListenableFuture} functionality.
69     *
70     * <p><b>Warning:</b> If the input future does not already implement {@code
71     * ListenableFuture}, the returned future will emulate {@link
72     * ListenableFuture#addListener} by submitting a task to the given executor at
73     * the first call to {@code addListener}. The task must be started by the
74     * executor promptly, or else the returned {@code ListenableFuture} may fail
75     * to work.  The task's execution consists of blocking until the input future
76     * is {@linkplain Future#isDone() done}, so each call to this method may
77     * claim and hold a thread for an arbitrary length of time. Use of bounded
78     * executors or other executors that may fail to execute a task promptly may
79     * result in deadlocks.
80     *
81     * <p>Prefer to create {@code ListenableFuture} instances with {@link
82     * SettableFuture}, {@link MoreExecutors#listeningDecorator(
83     * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
84     * {@link AbstractFuture}, and other utilities over creating plain {@code
85     * Future} instances to be upgraded to {@code ListenableFuture} after the
86     * fact.
87     *
88     * @since 12.0
89     */
90    public static <V> ListenableFuture<V> listenInPoolThread(
91        Future<V> future, Executor executor) {
92      checkNotNull(executor);
93      if (future instanceof ListenableFuture) {
94        return (ListenableFuture<V>) future;
95      }
96      return new ListenableFutureAdapter<V>(future, executor);
97    }
98  
99    /**
100    * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
101    * will wait on the future to finish, and when it completes, run the
102    * listeners.  This implementation will wait on the source future
103    * indefinitely, so if the source future never completes, the adapter will
104    * never complete either.
105    *
106    * <p>If the delegate future is interrupted or throws an unexpected unchecked
107    * exception, the listeners will not be invoked.
108    */
109   private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
110       implements ListenableFuture<V> {
111 
112     private static final ThreadFactory threadFactory =
113         new ThreadFactoryBuilder()
114             .setDaemon(true)
115             .setNameFormat("ListenableFutureAdapter-thread-%d")
116             .build();
117     private static final Executor defaultAdapterExecutor =
118         Executors.newCachedThreadPool(threadFactory);
119 
120     private final Executor adapterExecutor;
121 
122     // The execution list to hold our listeners.
123     private final ExecutionList executionList = new ExecutionList();
124 
125     // This allows us to only start up a thread waiting on the delegate future
126     // when the first listener is added.
127     private final AtomicBoolean hasListeners = new AtomicBoolean(false);
128 
129     // The delegate future.
130     private final Future<V> delegate;
131 
132     ListenableFutureAdapter(Future<V> delegate) {
133       this(delegate, defaultAdapterExecutor);
134     }
135 
136     ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
137       this.delegate = checkNotNull(delegate);
138       this.adapterExecutor = checkNotNull(adapterExecutor);
139     }
140 
141     @Override
142     protected Future<V> delegate() {
143       return delegate;
144     }
145 
146     @Override
147     public void addListener(Runnable listener, Executor exec) {
148       executionList.add(listener, exec);
149 
150       // When a listener is first added, we run a task that will wait for
151       // the delegate to finish, and when it is done will run the listeners.
152       if (hasListeners.compareAndSet(false, true)) {
153         if (delegate.isDone()) {
154           // If the delegate is already done, run the execution list
155           // immediately on the current thread.
156           executionList.execute();
157           return;
158         }
159 
160         adapterExecutor.execute(new Runnable() {
161           @Override
162           public void run() {
163             try {
164               /*
165                * Threads from our private pool are never interrupted. Threads
166                * from a user-supplied executor might be, but... what can we do?
167                * This is another reason to return a proper ListenableFuture
168                * instead of using listenInPoolThread.
169                */
170               getUninterruptibly(delegate);
171             } catch (Error e) {
172               throw e;
173             } catch (Throwable e) {
174               // ExecutionException / CancellationException / RuntimeException
175               // The task is done, run the listeners.
176             }
177             executionList.execute();
178           }
179         });
180       }
181     }
182   }
183 
184   private JdkFutureAdapters() {}
185 }